package main

import (
	"context"
	"dash/cmd/data/oldmessage"
	"dash/messages"
	"database/sql"
	"encoding/json"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"github.com/olivere/elastic/v7"
	"os"
)

func Old2new(message oldmessage.Message) messages.Message {
	typeStr := "text"
	dataSource := "微信"
	topics := make([]messages.Topic, len(message.Topics))
	for i, topic := range message.Topics {
		topics[i] = messages.Topic{
			Id:           topic.Name,
			KeywordScore: topic.Score,
			Keywords:     message.TopicKeywords,
			Name:         topic.Name,
			Score:        topic.Score,
		}
	}
	newMessage := messages.Message{
		Id:               message.Id,
		Type:             typeStr,
		Content:          message.MessageDetail.Text,
		Timestamp:        message.MessageDetail.Timestamp,
		CaptureTimestamp: message.MessageDetail.CaptureTime,
		DataSource:       dataSource,
		Topics:           topics,
		Member: messages.Member{
			Id:       message.Member.Id,
			NickName: message.Member.Nickname,
			Sex:      message.Member.Sex,
		},
		Chatroom: messages.Chatroom{
			Id:   message.Chatroom.Id,
			Name: message.Chatroom.Name,
		},
	}
	return newMessage
}

func main() {
	ctx := context.Background()
	oldEsClient, err := elastic.NewSimpleClient(elastic.SetURL("http://192.168.31.179:39200"))
	if err != nil {
		panic(err)
	}
	fmt.Println("old es client")
	newEsClient, err := elastic.NewSimpleClient(elastic.SetURL("http://192.168.31.36:19200"))
	if err != nil {
		panic(err)
	}
	_ = newEsClient
	fmt.Println("new es client")
	mysqlDB, err := sql.Open("mysql", "root:nil@tcp(192.168.31.36:13306)/dash")
	if err != nil {
		panic(err)
	}
	fmt.Println("mysql db")
	defer func() {
		_ = mysqlDB.Close()
	}()
	//prepare(mysqlDB)
	fmt.Println("mysql user prepared")
	members := map[string]bool{}
	chatrooms := map[string]bool{}
	chatroomMembers := map[string]bool{}
	_ = chatrooms
	memberInsertStmt, err := mysqlDB.Prepare(
		`INSERT INTO
        member(id, nickname, usedNickname, age, sex, dataSource)
        VALUES(?, ?, ?, ?, ?, ?);`)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = memberInsertStmt.Close()
	}()
	chatroomInsertStmt, err := mysqlDB.Prepare(
		`INSERT INTO
        chatroom(id, name, dataSource)
        VALUES(?,?,?);`)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = chatroomInsertStmt.Close()
	}()
	chatroomMemberInsertStmt, err := mysqlDB.Prepare(
		`INSERT INTO
        chatroom_member(chatroomId,memberId)
        VALUES(?,?);`)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = chatroomMemberInsertStmt.Close()
	}()
	collectorMemberInsertStmt, err := mysqlDB.Prepare(
		`INSERT INTO
        collector_member(collectorId, memberId) VALUES (?,?);`)
	if err != nil {
		panic(err)
	}
	i := 0
	defer func() {
		_ = collectorMemberInsertStmt.Close()
	}()
	collectorChatroomInsertStmt, err := mysqlDB.Prepare(
		`INSERT INTO collector_chatroom(collectorId, chatroomId) VALUES (?,?);`)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = collectorChatroomInsertStmt.Close()
	}()
	searchResult, err := oldEsClient.Scroll("message").Scroll("2m").Size(1000).Query(elastic.NewMatchAllQuery()).Sort("message.timestamp", true).Do(ctx)
	for {
		if err != nil {
			panic(err)
		}
		if len(searchResult.Hits.Hits) == 0 {
			fmt.Println("finish")
			os.Exit(0)
		}
		fmt.Println(i, len(searchResult.Hits.Hits), searchResult.Hits.Hits[0].Id)
		i++
		var requests []elastic.BulkableRequest
		for _, hit := range searchResult.Hits.Hits {
			var message oldmessage.Message
			if err := json.Unmarshal(hit.Source, &message); err != nil {
				fmt.Println("err", err)
				continue
			}
			newMessage := Old2new(message)
			newMessage.CollectorId = collectorId
			if !members[newMessage.Member.Id] {
				members[newMessage.Member.Id] = true
				member := newMessage.Member
				var nickname string
				usedNickname, _ := json.Marshal([]string{member.NickName})
				_, err := memberInsertStmt.Exec(member.Id, nickname, usedNickname, 0, member.Sex, "微信")
				if err != nil {
					panic(err)
				}
				if _, err := collectorMemberInsertStmt.Exec(collectorId, member.Id); err != nil {
					panic(err)
				}
			}
			if !chatrooms[newMessage.Chatroom.Id] {
				chatrooms[newMessage.Chatroom.Id] = true
				chatroom := newMessage.Chatroom
				_, err := chatroomInsertStmt.Exec(chatroom.Id, chatroom.Name, "微信")
				if err != nil {
					panic(err)
				}
				if _, err := collectorChatroomInsertStmt.Exec(collectorId, chatroom.Id); err != nil {
					panic(err)
				}
			}
			if !chatroomMembers[newMessage.Chatroom.Id+"_"+newMessage.Member.Id] {
				chatroomMembers[newMessage.Chatroom.Id+"_"+newMessage.Member.Id] = true
				_, err := chatroomMemberInsertStmt.Exec(newMessage.Chatroom.Id, newMessage.Member.Id)
				if err != nil {
					panic(err)
				}
			}
			requests = append(
				requests,
				elastic.NewBulkIndexRequest().Index("message").Id(newMessage.Id).Doc(newMessage),
			)
			if len(newMessage.Topics) > 0 {
				requests = append(
					requests,
					elastic.NewBulkIndexRequest().Index("alarm_message").Id(newMessage.Id).Doc(newMessage),
				)
			}
		}
		_, err = newEsClient.Bulk().Add(requests...).Do(ctx)
		if err != nil {
			panic(err)
		}
		if i > 10 {
			break
		}
		searchResult, err = oldEsClient.Scroll("message").ScrollId(searchResult.ScrollId).Do(ctx)
	}
}
